Scala example using Spark SQL over Cloudant as a source

This sample notebook is written in Scala. Make sure the kernel is started and connected when executing this notebook.

The data source for this example can be found at: http://examples.cloudant.com/crimes/. Replicate the database into your own Cloudant account before you execute this script. Once you import this notebook into Watson Studio, you will see an embedded video which walks you through the notebook.

Watch the video

Once you import this notebook into Watson Studio, you will see an embedded video which walks you through the notebook.

1. Work with the Spark Context

A Spark Context handle sc is available with every notebook create in the Spark Service. Use it to understand the Spark version used, the environment settings, and create a Spark SQL Context object off of it.


In [ ]:
import org.apache.spark.sql.SparkSession

In [ ]:
val spark = SparkSession.builder().getOrCreate()

2. Work with a Cloudant database

A Dataframe object can be created directly from a Cloudant database. To configure the database as source, pass these options:

1 - package name that provides the classes (like CloudantDataSource) implemented in the connector to extend BaseRelation. For the Cloudant Spark connector this will be org.apache.bahir.cloudant

2 - cloudant.host parameter to pass the Cloudant account name

3 - cloudant.user parameter to pass the Cloudant user name

4 - cloudant.password parameter to pass the Cloudant account password

5 - the database to load


In [ ]:
val cloudantdata = spark.read.format("org.apache.bahir.cloudant").
option("cloudant.host","xxxxxx.cloudant.com").
option("cloudant.username","xxxxxx").
option("cloudant.password","xxxxxx").
load("crimes")

3. Work with a Dataframe

At this point all transformations and functions should behave as specified with Spark SQL. (http://spark.apache.org/sql/)

This code prints the schema and a record count.


In [ ]:
cloudantdata.printSchema

This code displays the values of the naturecode field.


In [ ]:
val resultsDF = cloudantdata.select("properties.naturecode")
resultsDF.show()

This code filters the data to just those records with a naturecode of "DISTRB", and then displays that data.


In [ ]:
val disturbDF = cloudantdata.filter(cloudantdata.col("properties.naturecode").startsWith("DISTRB"))
disturbDF.show()

This code writes the filtered data to a Cloudant database called crimes_filtered. If the Cloudant database exists, the documents will be added to the database. If the database does not exist, set the createDBOnSave option to "true".


In [ ]:
disturbDF.select("properties").write.format("org.apache.bahir.cloudant").
option("cloudant.host","xxxxxxx.cloudant.com").
option("cloudant.username","xxxxxxxx").
option("cloudant.password","xxxxxxxx").
option("createDBOnSave", "false").
save("crimes_filtered")

In [ ]: